package cn.xinfei.xdecision.pulsar;

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.StringSchema;

import java.util.UUID;
import java.util.concurrent.*;

public class PulsarDemo {
    public static void main(String[] args) throws PulsarClientException {
        ExecutorService executorService = Executors.newFixedThreadPool(500);
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://10.10.200.123:6650,10.10.200.124:6650,10.10.200.125:6650")
//                .serviceUrl("pulsar://192.168.60.95:6650")
                .ioThreads(10)
                .listenerThreads(1000)
                .operationTimeout(15, TimeUnit.SECONDS)
                .connectionTimeout(10, TimeUnit.SECONDS)
                .connectionsPerBroker(1)
                .build();
        Producer<String> productBuilder = client.newProducer(new StringSchema())
                .topic("persistent://risk/xdecision/shiyusen-p1")
                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .batchingMaxMessages(1000)
                .batchingMaxBytes(12800000)
                .enableBatching(true)
                .maxPendingMessages(100000000)
//                .compressionType(CompressionType.LZ4)
                .create();

        for (int i = 0; i < 100000; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 1000; i++) {
                            CompletableFuture<MessageId> messageIdCompletableFuture = productBuilder.sendAsync(msg + i);

                            int finalI = i;
                            messageIdCompletableFuture.handleAsync((msgId, ex) -> {
                                if (ex != null) {
                                    System.out.println("message send fail,topic={},msgId={},producerDuration={},retry={}");
                                    ex.printStackTrace();
                                }else{
                                    System.out.println("message send success,topic={},msgId={},messageId={},producerDuration={},retry={}" + msgId);
                                }
                                ThreadPoolExecutor rt= (ThreadPoolExecutor) executorService;
                                System.out.println(finalI+":"+rt.getActiveCount());
                                return null;
                            });
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            Thread.sleep(1000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        productBuilder.close();
    }

    static String msg = "{\n" +
            "    \"_id\" : ObjectId(\"64c086a23dcadb4d9ff32080\"),\n" +
            "    \"branchRejectInfo\" : [\n" +
            "\n" +
            "    ],\n" +
            "    \"createTime\" : \"2023-07-26 10:36:18\",\n" +
            "    \"decisionId\" : \"20230726103608572shiyusen00007\",\n" +
            "    \"engineCode\" : \"jcl_20230722000001\",\n" +
            "    \"input\" : {\n" +
            "        \"innerResult\" : \"ssr\",\n" +
            "        \"biz_flow_number\" : \"xxxx\",\n" +
            "        \"sex_ceshi\" : 1.0,\n" +
            "        \"user_utm_source\" : \"XYF01-SHUMING\",\n" +
            "        \"age_ceshi\" : 24.0,\n" +
                "        \"age\" : 20.0\n" +
            "    },\n" +
            "    \"requestId\" : \"285658630\",\n" +
            "    \"ruleHitList\" : [\n" +
            "\n" +
            "    ],\n" +
            "    \"updateTime\" : \"2023-07-26 10:36:19\"\n" +
            "}\n" +
            "{\n" +
            "    \"_id\" : ObjectId(\"64c086a63dcadb4d9ff3208d\"),\n" +
            "    \"branchRejectInfo\" : [\n" +
            "        {\n" +
            "            \"finalRejectName\" : \"年龄拒绝\",\n" +
            "            \"prohibitedApplyTime\" : NumberInt(0),\n" +
            "            \"rejectBranchNames\" : \"\",\n" +
            "            \"rejectNames\" : \"\"\n" +
            "        }\n" +
            "    ],\n" +
            "    \"createTime\" : \"2023-07-26 10:36:19\",\n" +
            "    \"decisionId\" : \"20230726103608572shiyusen00007_1_1\",\n" +
            "    \"engineCode\" : \"jcl_20230602000001\",\n" +
            "    \"input\" : {\n" +
            "    \"_id\" : ObjectId(\"64c086a73dcadb4d9ff32097\"),\n" +
            "            \"ruleCode\" : \"fzgz_20230530000009\",\n" +
            "            \"ruleHitFlag\" : false,\n" +
            "            \"ruleId\" : NumberInt(388),\n" +
            "            \"ruleName\" : \"百融多头\",\n" +
            "            \"ruleScore\" : 0.0,\n" +
            "            \"ruleVersionId\" : NumberInt(1098),\n" +
            "            \"versionNo\" : NumberInt(3)\n" +
            "        },\n" +
            "        {\n" +
            "            \"blockId\" : NumberInt(1568),\n" +
            "            \"ruleCode\" : \"fzgz_20230530000008\",\n" +
            "            \"ruleHitFlag\" : false,\n" +
            "            \"ruleId\" : NumberInt(392),\n" +
            "            \"ruleName\" : \"rule1\",\n" +
            "            \"ruleScore\" : 0.0,\n" +
            "            \"ruleVersionId\" : NumberInt(1110),\n" +
            "            \"versionNo\" : NumberInt(3)\n" +
            "        },\n" +
            "        {\n" +
            "            \"blockId\" : NumberInt(1570),\n" +
            "            \"ruleCode\" : \"fzgz_20230530000008\",\n" +
            "            \"ruleHitFlag\" : false,\n" +
            "            \"ruleId\" : NumberInt(392),\n" +
            "            \"ruleName\" : \"rule1\",\n" +
            "            \"ruleScore\" : 0.0,\n" +
            "            \"ruleVersionId\" : NumberInt(1110),\n" +
            "            \"versionNo\" : NumberInt(3)\n" +
            "        }\n" +
            "    ],\n" +
            "    \"updateTime\" : \"2023-07-26 10:36:23\"\n" +
            "}\n" +
            "{\n" +
            "    \"_id\" : ObjectId(\"64c086a73dcadb4d9ff32099\"),\n" +
            "    \"branchRejectInfo\" : [\n" +
            "\n" +
            "    ],\n" +
            "    \"createTime\" : \"2023-07-26 10:36:23\",\n" +
            "    \"decisionId\" : \"20230726103608572shiyusen00007_1_1\",\n" +
            "    \"engineCode\" : \"jcl_20230602000001\",\n" +
            "    \"input\" : {\n" +
            "        \"bairong_loan_als_m12_cell_bank_week_orgnum\" : NumberInt(-99),\n" +
            "        \"tanzhi_210\" : NumberInt(0)\n" +
            "    },\n" +
            "    \"ips\" : [\n" +
            "        \"p_192.168.60.95\",\n" +
            "        \"c_192.168.60.95\"\n" +
            "    ],\n" +
            "    \"nodeCode\" : \"jcl_20230602000001_19\",\n" +
            "    \"nodeType\" : \"ruleBase\",\n" +
            "    \"output\" : {\n" +
            "        \"jcl_20230602000001_19_duration\" : NumberInt(1),\n" +
            "        \"consume_user_level_output\" : \"3\",\n" +
            "        \"risk_decision_label\" : \"miss\",\n" +
            "        \"ruleHitResult\" : \"miss\",\n" +
            "        \"jcl_20230602000001_19_terminal\" : false\n" +
            "    },\n" +
            "    \"requestId\" : \"285658630\",\n" +
            "    \"ruleHitList\" : [\n" +
            "        {\n" +
            "            \"blockId\" : NumberInt(1526),\n" +
            "            \"ruleCode\" : \"fzgz_20230530000009\",\n" +
            "            \"ruleHitFlag\" : false,\n" +
            "            \"ruleId\" : NumberInt(388),\n" +
            "            \"ruleName\" : \"百融多头\",\n" +
            "            \"ruleScore\" : 0.0,\n" +
            "            \"ruleVersionId\" : NumberInt(1098),\n" +
            "            \"versionNo\" : NumberInt(3)\n" +
            "        },\n" +
            "    ],\n" +
            "    \"updateTime\" : \"2023-07-26 10:36:23\"\n" +
            "}\n" +
            "{\n" +
            "    \"_id\" : ObjectId(\"64c086a73dcadb4d9ff3209a\"),\n" +
            "    \"branchRejectInfo\" : [\n" +
            "\n" +
            "    ],\n" +
            "    \"createTime\" : \"2023-07-26 10:36:23\",\n" +
            "    \"decisionId\" : \"20230726103608572shiyusen00007_1_1\",\n" +
            "    \"requestId\" : \"285658630\",\n" +
            "    \"ruleHitList\" : [\n" +
            "\n" +
            "    ],\n" +
            "    \"updateTime\" : \"2023-07-26 10:36:23\"\n" +
            "}\n" +
            "{\n" +
            "    \"_id\" : ObjectId(\"64c086a83dcadb4d9ff320a6\"),\n" +
            "    \"branchRejectInfo\" : [\n" +
            "\n" +
            "    ],\n" +
            "    \"createTime\" : \"2023-07-26 10:36:24\",\n" +
            "    \"decisionId\" : \"20230726103608572shiyusen00007\",\n" +
            "    \"engineCode\" : \"jcl_20230722000001\",\n" +
            "    \"input\" : {\n" +
            "        \"innerResult\" : \"ssr\",\n" +
            "        \"biz_flow_number\" : \"xxxx\",\n" +
            "        \"sex_ceshi\" : 1.0,\n" +
            "        \"interGroup\" : \"jcl_20230717000003_5\",\n" +
            "        \"user_utm_source\" : \"XYF01-SHUMING\",\n" +
            "        \"huarong3_123\" : 0.0,\n" +
            "        \"str_hit_flg\" : \"hit\",\n" +
            "        \"seq_id\" : \"201802061659392037290\",\n" +
            "        \"credit_channel\" : \"guohuai\",\n" +
            "        \"hit_score\" : NumberInt(1),\n" +
            "        \"age\" : 20.0\n" +
            "    },\n" +
            "    \"requestId\" : \"285658630\",\n" +
            "    \"ruleHitList\" : [\n" +
            "\n" +
            "    ],\n" +
            "    \"updateTime\" : \"2023-07-26 10:36:24\"\n" +
            "}\n";
}
